package com.softwin.cryptocurrency.demo.hedging;

import com.softwin.cryptocurrency.demo.security.JwtCallCredential;
import com.softwin.grpc.common.Common;
import com.softwin.marketdata.gateway.service.MarketDataManageGrpc;
import com.softwin.marketdata.gateway.service.MarketGateway;
import com.softwin.order.reporting.service.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.softwin.cryptocurrency.demo.StrategyConstants.*;

/**
 * Created by Ao Yi on 2018/07/04.
 */

public class DepthHedging {
    private String host = "192.168.1.134"; //服务器地址 shi kang :192.168.1.134
    private String exchanges[]; // 交易所列表
    private Common.Symbol[] symbols; //交易币对列表
    private BigDecimal[][] initstockCurrencies; //记录两个账号初始的币对余额
    private BigDecimal[][] stockCurrencies; //记录两个账号当前的币对余额
    private BigDecimal currency; //账户数字货币余额
    private BigDecimal operationPrice; //下单价格
    private BigDecimal operationNum = new BigDecimal("0.0001"); //下单数量
    private BigDecimal depthAHighBids;  //交易所A最高买价
    private BigDecimal depthALowAsks;   //交易所A最低卖价
    private BigDecimal depthBHighBids;  //交易所B最高买价
    private BigDecimal depthBLowAsks;   //交易所B最低卖价
    private static MarketGateway.DepthsData depthA, depthB;
    private MarketGateway.Depth orderH; //最低卖价
    private MarketGateway.Depth orderL; // 最高买家
    private BigDecimal minSpreadA;    //交易所A到交易所B价格的最小背离
    private BigDecimal minSpreadB;    //交易所B到交易所A价格的最小背离
    private Map<String, String> orderMap = new HashMap<>(); //等待成交的报单，key为方向(buy/sell)，value为orderId
    private Long lastTradeTime;  //上一次下单的时间
    private String[][] accounts = new String[2][3];
    private OrderReportingGrpc.OrderReportingStub[] tgStubs; //订单网关列表
    private QueryServiceGrpc.QueryServiceStub[] queryStubs; //查询网关列表
    private MarketDataManageGrpc.MarketDataManageStub mgStub;
    private String baseCurreny;
    private String quoteCurrency;
    private int[][] orderState; //账号的买单卖单状态，0代表无单，1代表订单未完成，2代表订单完成
    private boolean realTrade = true;
    public static void main(String[] args) throws InterruptedException{
        DepthHedging teh = new DepthHedging(FIRST_EXCHANGE,SECOND_EXCHANGE, BASE_CURRENCY, QUOTE_CURRENCY);
        teh.subMarketData();
        while (true) {
            if(depthA != null && depthB != null){
                teh.checkBuySellPoint();
            }
            Thread.sleep(3000);
        }
    }

    private DepthHedging(String exchangeA, String exchangeB, String baseCurrency, String quoteCurrency){
        this.exchanges = new String[]{exchangeA, exchangeB};
        this.baseCurreny = baseCurrency;
        this.quoteCurrency = quoteCurrency;
        minSpreadA = new BigDecimal("0.52");
        minSpreadB = new BigDecimal("0.51");
        accounts[0][0] = "Test_Huobi_SubAccount_Order";
        accounts[0][1] = "4235"; //交易员用户名
        accounts[0][2] = "测试密码";
        accounts[1][0] = "testOkexSubAccount";
        accounts[1][1] = "4235"; //交易员用户名
        accounts[1][2] = "测试密码";
        initstockCurrencies = new BigDecimal[2][2];
        stockCurrencies = new BigDecimal[2][2];
        orderState = new int[2][2];
        symbols = new Common.Symbol[2];
        tgStubs = new OrderReportingGrpc.OrderReportingStub[2];
        queryStubs = new QueryServiceGrpc.QueryServiceStub[2];
        lastTradeTime = 0L;
        ManagedChannel mgChannel = ManagedChannelBuilder.forAddress(host, 8991).usePlaintext().build();
        ManagedChannel tgChannel = ManagedChannelBuilder.forAddress(host, 8993).usePlaintext().build();
        LoginGrpc.LoginBlockingStub loginBlockingStub;
        LoginOuterClass.RspLogin rspLogin;
        String token;
        for (int i = 0; i<accounts.length; i++) {
            loginBlockingStub = LoginGrpc.newBlockingStub(tgChannel);
            rspLogin = loginBlockingStub.login(LoginOuterClass.ReqLogin.newBuilder().setUsername(accounts[i][1]).setPassword(accounts[i][2]).build());
            token = rspLogin.getToken();
            tgStubs[i] = OrderReportingGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));
            queryStubs[i] = QueryServiceGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));
            symbols[i] = Common.Symbol.newBuilder().setQuoteCurrency(quoteCurrency).setBaseCurrency(baseCurrency).setExchangeType(exchanges[i]).build();
        }
        mgStub = MarketDataManageGrpc.newStub(Optional.ofNullable(mgChannel).get());
        init();

    }

    private void init(){
        for (int i=0;i<tgStubs.length;i++) {
            StreamObserver<Orderreporting.ReqSubTrade> reqSubTradeStreamObserver = tgStubs[i].subTrade(new StreamObserver<Orderreporting.RtnSubTrade>() {
                @Override
                public void onNext(Orderreporting.RtnSubTrade value) {
                    if (value.getTrade().getQuantity().equals(operationNum)) {
                        //如果交易发生在第一个交易所，则更新orderState[0]
                        if (value.getTrade().getSymbol().getExchangeType().equals(symbols[0].getExchangeType())) {
                            if (value.getTrade().getSide().equals("buy")) {
                                System.out.println(value.getTrade().getSymbol().getExchangeType() + ":买单已成交");
                                orderState[0][0] = 2;
                                stockCurrencies[0][0].add(operationNum);

                            } else if (value.getTrade().getSide().equals("sell")) {
                                System.out.println(value.getTrade().getSymbol().getExchangeType() + ":卖单已成交");
                                orderState[0][1] = 2;
                                stockCurrencies[0][0].subtract(operationNum);
                            }
                        } else { //否则交易发生在第二个交易所，更新orderState[1]
                            if (value.getTrade().getSide().equals("buy")) {
                                System.out.println(value.getTrade().getSymbol().getExchangeType() + ":买单已成交");
                                orderState[1][0] = 2;
                                stockCurrencies[1][0].add(operationNum);

                            } else if (value.getTrade().getSide().equals("sell")) {
                                System.out.println(value.getTrade().getSymbol().getExchangeType() + ":卖单已成交");
                                orderState[1][1] = 2;
                                stockCurrencies[1][0].subtract(operationNum);
                            }
                        }
                    }
                    if(initstockCurrencies[0][0] != null)
                        System.out.println("当前"+baseCurreny+"总量:" +(stockCurrencies[0][0].add(stockCurrencies[1][0])));
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onCompleted() {

                }
            });
            reqSubTradeStreamObserver.onNext(Orderreporting.ReqSubTrade.newBuilder().build());
        }



        for(int i=0; i<2; i++){
            //查询初始仓位
            queryStubs[i].getSubaccounts(Orderreporting.SubaccountRequest.newBuilder().setSubaccountCode(accounts[i][0]).build(), new StreamObserver<Orderreporting.SubaccountResponse>() {
                @Override
                public void onNext(Orderreporting.SubaccountResponse value) {
                    if (value.getData(0).getExchangeType().equals(symbols[0].getExchangeType())) {
                        if (value.getDataCount() == 1 && value.getData(0).getAssetBalanceCount() > 1) {
                            for(Orderreporting.AssetBalance assetBalance : value.getData(0).getAssetBalanceList()) {
                                if(assetBalance.getAsset().equals(BASE_CURRENCY)) {
                                    initstockCurrencies[0][0] = new BigDecimal(assetBalance.getFree());
                                    stockCurrencies[0][0] = new BigDecimal(initstockCurrencies[0][0].toString());
                                }
                                if(assetBalance.getAsset().equals(QUOTE_CURRENCY)) {
                                    initstockCurrencies[0][1] = new BigDecimal(assetBalance.getFree());
                                    stockCurrencies[0][1] = new BigDecimal(initstockCurrencies[0][0].toString());
                                }
                            }
                        } else {
                            initstockCurrencies[0][0] = BigDecimal.ZERO;
                            stockCurrencies[0][0] = BigDecimal.ZERO;
                        }
                        System.out.println(FIRST_EXCHANGE+"初始状态，"+BASE_CURRENCY+"余额： " + initstockCurrencies[0][0]+", "+QUOTE_CURRENCY+"余额： " + initstockCurrencies[0][1]);
                    } else {
                        if (value.getDataCount() == 1 && value.getData(0).getAssetBalanceCount() > 1) {
                            for(Orderreporting.AssetBalance assetBalance : value.getData(0).getAssetBalanceList()) {
                                if(assetBalance.getAsset().equals(BASE_CURRENCY)) {
                                    initstockCurrencies[1][0] = new BigDecimal(assetBalance.getFree());
                                    stockCurrencies[1][0] = new BigDecimal(initstockCurrencies[0][0].toString());

                                }
                                if (assetBalance.getAsset().equals(QUOTE_CURRENCY)) {
                                    initstockCurrencies[1][1] = new BigDecimal(assetBalance.getFree());
                                    stockCurrencies[1][1] = new BigDecimal(initstockCurrencies[0][0].toString());
                                }
                            }
                        } else {
                            initstockCurrencies[1][0] = BigDecimal.ZERO;
                            stockCurrencies[1][0] = BigDecimal.ZERO;
                        }
                        System.out.println(SECOND_EXCHANGE+"初始状态，"+BASE_CURRENCY+"余额： " + initstockCurrencies[1][0]+", "+QUOTE_CURRENCY+"余额： " + initstockCurrencies[1][1]);
                    }
                }


                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onCompleted() {

                }
            });
        }



    }
    /**
     * 调用接口进行限价买入，收到响应后将orderId放入orderMap中
     * @param accountIndex 子账号索引, 0代表第一个账号，1 代表后一个账号
     */
    private void buy(int accountIndex){
        orderState[accountIndex][0] = 1;
        if(realTrade) {
            final Orderreporting.InsertOrderRequest orderTradeRequest = Orderreporting.InsertOrderRequest.newBuilder().
                    setSubaccountCode(accounts[accountIndex][0]).setSymbol(symbols[accountIndex]).setQuantity(operationNum.toString())
                    .setPrice(operationPrice.toString()).setType("limit").setSide("buy").build();
            System.out.println("报单买入： 价格" + operationPrice.toString());
            orderMap.put("buy", "placeHolderOrderId");//为避免发起报单到收到响应的过程中行情会再次出发报单，先在map里放一个占位符
            tgStubs[accountIndex].insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
                @Override
                public void onNext(Orderreporting.InsertOrderResponse value) {
                    if (!value.getCRsp().getIsSuccess()) {
                        orderState[accountIndex][0] = 0;
                        System.out.println("买单报单失败: "+accountIndex + value.getCRsp().getErrMsg());
                        orderMap.remove("buy");
                    } else {
                        //收到响应后更新orderId
                        orderMap.put("buy", value.getOrderId());
                        System.out.println("买单报单成功：" + value.getOrderId());
                    }
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println("买单报单失败: "+accountIndex + t.getMessage());
                    orderMap.remove("buy");
                }

                @Override
                public void onCompleted() {

                }
            });
        }
    }

    /**
     * 调用接口进行限价卖出，收到响应后将orderId放入orderMap中
     * @param accountIndex 子账号索引
     */
    private void sell(int accountIndex){
        if(realTrade) {
            orderState[accountIndex][1] = 1;
            final Orderreporting.InsertOrderRequest orderTradeRequest = Orderreporting.InsertOrderRequest.newBuilder().
                    setSubaccountCode(accounts[accountIndex][0]).setSymbol(symbols[accountIndex]).setQuantity(operationNum.toString())
                    .setPrice(operationPrice.toString()).setType("limit").setSide("sell").build();
            System.out.println("报单卖出： 价格" + operationPrice.toString());
            orderMap.put("sell", "placeHolderOrderId");//为避免发起报单到收到响应的过程中行情会再次出发报单，先在map里放一个占位符
            tgStubs[accountIndex].insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
                @Override
                public void onNext(Orderreporting.InsertOrderResponse value) {
                    if (!value.getCRsp().getIsSuccess()) {
                        orderState[accountIndex][1] = 0;
                        System.out.println("卖单报单失败: "+accountIndex + value.getCRsp().getErrMsg());
                        orderMap.remove("sell");
                    } else {
                        //收到响应后更新orderId
                        orderMap.put("sell", value.getOrderId());
                        System.out.println("卖单报单成功："+accountIndex + value.getOrderId());
                    }
                }

                @Override
                public void onError(Throwable t) {
                    System.out.println("卖单报单失败: "+accountIndex + t.getMessage());
                    orderMap.remove("sell");
                }

                @Override
                public void onCompleted() {

                }
            });
        }
    }

    /**
     * 撤单，收到响应后，将订单从orderMap中移除
     * @param orderId 订单ID
     * @param accountIndex 子账号索引
     */
    private void cancel(String orderId, int accountIndex) {
        System.out.println("取消");
        final Orderreporting.CancelOrderRequest cancelOrderRequest = Orderreporting.CancelOrderRequest.newBuilder().setOrderId(orderId).build();
        tgStubs[accountIndex].cancelOrder(cancelOrderRequest, new StreamObserver<Orderreporting.CancelOrderResponse>() {
            @Override
            public void onNext(Orderreporting.CancelOrderResponse value) {
                if(value.getCRsp().getIsSuccess()){
                    if(value.getData().getOrderId().equals(orderMap.get("buy"))){
                        orderState[accountIndex][0] = 0;
                        System.out.println("取消买单成功");
                        orderMap.remove("buy");
                    }
                }
                if(value.getCRsp().getIsSuccess()){
                    if(value.getData().getOrderId().equals(orderMap.get("sell"))){
                        orderState[accountIndex][1] = 0;
                        System.out.println("取消卖单成功");
                        orderMap.remove("sell");
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {
            }
        });
    }
    /**
     * 订阅行情，保最近两个交易所的市场深度数据
     */
    private void subMarketData(){
        StreamObserver<MarketGateway.ReqSubDepth> requestObserver = mgStub.subDepth(new StreamObserver<MarketGateway.DepthsDataResponse>() {
            @Override
            public void onNext(MarketGateway.DepthsDataResponse value) {
                if (!value.getCRsp().getIsSuccess()){
                    return;
                }
                MarketGateway.DepthsData depthsData = value.getData();
//                System.out.println("交易所："+ value.getSymbol().getExchangeType());
                if(depthsData.getSymbol().getExchangeType().equals(exchanges[0]))
                    depthA = depthsData;
                else
                    depthB = depthsData;

            }

            @Override
            public void onError(Throwable t) {
                System.out.println("encounter error");
            }

            @Override
            public void onCompleted() {

            }
        });
        Common.Symbol symbol;
        MarketGateway.ReqSubDepth reqSubDepth;
        for (int i=0;i<accounts.length;i++) {
            reqSubDepth = MarketGateway.ReqSubDepth.newBuilder().setSymbol(symbols[i]).build();
            System.out.println("sub " + exchanges[i] + ": " +baseCurreny+ ","+ quoteCurrency);
            requestObserver.onNext(reqSubDepth);
        }
    }
    /**
     * 订阅行情，保最近两个交易所的市场深度数据
     * 如果A交易所的最高买单报价（max bids） > B交易所的最低卖单报价（min asks），且他们的差超过预设的阈值minSpreadA，则存在套利空间
     */
    private void checkBuySellPoint() {
        //检查上一次交易完成情况，如果订单超过60s未被处理，则取消所有订单，
        if(lastTradeTime>0){
            if(chekcOrderState()) {
                if((System.currentTimeMillis()-lastTradeTime) > 20000) {
                    cancelAllOrders();
                    lastTradeTime = 0L;
                }else{
                    return;
                }
            }
        }
        depthAHighBids = new BigDecimal(depthA.getBids(0).getPrice());
        depthALowAsks = new BigDecimal(depthA.getAsks(0).getPrice());
        depthBHighBids = new BigDecimal(depthB.getBids(0).getPrice());
        depthBLowAsks = new BigDecimal(depthB.getAsks(0).getPrice());
        System.out.println(depthAHighBids+"   "+depthBLowAsks);
        int exchangeH,exchangeL;
        //diffA ： A交易所买单最高价-B交易所卖单最低价
        BigDecimal diffA = depthAHighBids.subtract(depthBLowAsks).setScale(6, RoundingMode.HALF_EVEN);
        //diffB ： B交易所买价最高价-A交易所卖价最低价
        BigDecimal diffB = depthBHighBids.subtract(depthALowAsks).setScale(6, RoundingMode.HALF_EVEN);
        int HPos = 0;
        if (diffA.compareTo(minSpreadA) == 1) {
            orderH = depthA.getBids(0);    //定义下单最高价
            orderL = depthB.getAsks(0);    //定义下单最低价
            exchangeH = 0;              //交易所高价者 0
            exchangeL = 1;
        } else if (diffB.compareTo(minSpreadB) == 1) {
            HPos = 1;
            orderH = depthB.getBids(0);    //定义下单最高价
            orderL = depthA.getAsks(0);    //定义下单最低价
            exchangeH = 1;
            exchangeL = 0;
        } else {
            System.out.println("等待，暂无套利空间");
            return;
        }
        operationPrice = new BigDecimal(orderH.getPrice())
                        .add(new BigDecimal(orderL.getPrice()))
                        .divide(new BigDecimal("2.0"))
                        .setScale(2,RoundingMode.HALF_EVEN); //定义下单的价格
        operationNum = new BigDecimal("0.001");
        if (exchangeL == 0)
            buy(0);
        else if (exchangeL == 1)
            buy(1);
        if (exchangeH == 0)
            sell(0);
        else if (exchangeH == 1)
            sell(1);
        lastTradeTime = System.currentTimeMillis();
    }

    private boolean chekcOrderState(){
        return orderState[0][0] == 1 || orderState[0][1] == 1|| orderState[1][0] == 1 || orderState[1][1] == 1;
    }
    /**
     * 如果超过60秒上一笔报单没有完成，则对所有未完成保单进行撤单操作
     */
    private void cancelAllOrders(){
        for (int i=0; i<2; i++){
            for (int j=0; j<2; j++){
                if (orderState[i][j] == 1) {
                    if (j==0) {
                        cancel(orderMap.get("buy"), i);
                    } else {
                        cancel(orderMap.get("sell"), i);
                    }
                }
            }
        }
    }

}
